package com.atguigu.app.func;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.HBaseUtil;
import com.atguigu.utils.JedisUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import redis.clients.jedis.Jedis;

public abstract class DimMapFunction<T> extends RichMapFunction<T, T> {

    private Connection connection;
    private Jedis jedis;
    private String tableName;

    public DimMapFunction(String tableName) {
        this.tableName = tableName;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        connection = HBaseUtil.getConnection();
        jedis = JedisUtil.getJedis();
    }

    public abstract String getKey(T value);

    public abstract void join(T value, JSONObject dimInfo);

    @Override
    public T map(T value) throws Exception {

        //查询维表获取维度信息
        JSONObject dimInfo = DimInfoFunction.getDimInfo(jedis, connection, tableName, getKey(value));

        //补充维度信息
        if (dimInfo != null) {
            join(value, dimInfo);
        }

        return value;
    }


    @Override
    public void close() throws Exception {
        connection.close();
        jedis.close();
    }
}
